package day05;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;

/**
 * Flink Table API 与 SQL —— 输出到其他外部系统
 *
 * @author lvbingbing
 * @date 2022-01-18 22:16
 */
public class FlinkTableApi04 {
    public static void main(String[] args) throws Exception {
        // 1、获取可执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int parallelism = 1;
        env.setParallelism(parallelism);
        // 2、输出到其他外部系统
        studyWriteToAnother(env);
        // 3、触发程序执行
        env.execute();
    }

    /**
     * 输出到其他外部系统，如 elasticsearch、mysql
     *
     * @param env <br>
     */
    private static void studyWriteToAnother(StreamExecutionEnvironment env) {
        // 1、创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 2、读取数据
        tableEnv.connect(new FileSystem().path("input/sensor.txt"))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.DOUBLE())
                        .field("temp", DataTypes.DOUBLE()))
                .createTemporaryTable("inputTable");
        Table inputTable = tableEnv.from("inputTable");
        Table resTable = inputTable.select("id, temp")
                .where("id = sensor_6");
        // 3、将数据输出到 es 中
        writeToEs(tableEnv, resTable);
        // 4、输出到 mysql
        writeToMysql(tableEnv, resTable);
    }

    /**
     * 建立 elasticsearch 连接，将数据输出到es中
     *
     * @param tableEnv <br>
     * @param resTable <br>
     */
    private static void writeToEs(StreamTableEnvironment tableEnv, Table resTable) {
        // 3、建立 elasticsearch 连接，将数据输出到es中
        tableEnv.connect(new Elasticsearch()
                        .version("6")
                        .host("hadoop102", 9200, "http")
                        .index("sensor")
                        .documentType("temp"))
                .inUpsertMode()
                .withFormat(new Json())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("count", DataTypes.STRING()))
                .createTemporaryTable("esOutputTable");
        resTable.insertInto("esOutputTable");
    }

    /**
     * 输出到 mysql
     *
     * @param tableEnv <br>
     * @param resTable <br>
     */
    private static void writeToMysql(StreamTableEnvironment tableEnv, Table resTable) {
        String sql = "create table jdbcOutputTable (" +
                "id varchar(20) not null," +
                "cnt bigint not null" +
                ") with (" +
                "'connector.type' = 'jdbc'," +
                "'connector.url' = 'jdbc:mysql://hadoop102:3306/test'," +
                "'connector.table' = 'sensor_count'," +
                "'connector.driver' = 'com.mysql.jdbc.Driver'," +
                "'connector.username' = 'root'," +
                "'connector.password' = '139559'" +
                ")";
        // 执行 DDL 创建表
        tableEnv.sqlUpdate(sql);
        resTable.insertInto("jdbcOutputTable");
    }
}